
(EN) Toronto Stock Exchange
(FR) Bourse de Toronto
| Eder Valderrama | eder.valderrama@usp.br | 2023 |
(EN) Import packages to configure folders and Spark
Project Description:

(FR) Importer les packages pour configurer les dossiers et Spark
Description du projet :

import os
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pandas as pd
import plotly.express as px
import matplotlib.pyplot as plt
dirname_parquet = Path('./Canada/')
folder_name = os.path.join(os.getcwd(), dirname_parquet)
folder_name
'/app/Dev/Project/00_portfolio/00_stock_market_CA/Canada'
file_name_path_i = os.path.join(folder_name, \
"dataframe_companies_full.parquet")
file_name_path_i
'/app/Dev/Project/00_portfolio/00_stock_market_CA/Canada/dataframe_companies_full.parquet'
configuration_choice = ['balance', 'moderate']
configuration_choice = configuration_choice[1]
print(configuration_choice)
moderate
conf = SparkConf()
if configuration_choice == 'balance':
# Heap size for the driver
conf.set("spark.driver.memory", "1g")
# Heap size for the executors
conf.set("spark.executor.memory", "5g")
# No. of executors (equal to the # machine cores)
conf.set("spark.executor.instances", "8")
# No. of partitions (equal to the # machine cores)
conf.set("spark.default.parallelism", "8")
elif configuration_choice == 'moderate':
conf.set("spark.driver.memory", "2g")
conf.set("spark.executor.memory", "4g")
conf.set("spark.executor.instances", "6")
conf.set("spark.default.parallelism", "8")
%%time
spark = SparkSession.builder.\
config(conf=conf).\
appName("ParquetSplit").\
getOrCreate()
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/07/25 16:28:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
CPU times: user 46.4 ms, sys: 41.7 ms, total: 88.1 ms Wall time: 7.48 s
spark.getActiveSession()
SparkSession - in-memory
%%time
df = spark.read.parquet(file_name_path_i)
CPU times: user 67.1 ms, sys: 9.5 ms, total: 76.6 ms Wall time: 1min 58s
%%time
df.printSchema()
root |-- url_company: string (nullable = true) |-- description: string (nullable = true) |-- sector: string (nullable = true) |-- industry: string (nullable = true) |-- website: string (nullable = true) |-- company_name: string (nullable = true) |-- codes: string (nullable = true) CPU times: user 4.95 ms, sys: 342 µs, total: 5.29 ms Wall time: 34 ms
file_name_dir_partition = os.path.join(folder_name, \
'parquet_partitioned_by_sector')
file_name_dir_partition
'/data/Dev/Project/00_portfolio/00_stock_market_CA/Canada/parquet_partitioned_by_sector'
os.path.exists(file_name_dir_partition)
False
%%time
df.write.partitionBy("sector").parquet(file_name_dir_partition)
CPU times: user 157 ms, sys: 43.5 ms, total: 201 ms Wall time: 6min 5s
os.listdir(file_name_dir_partition)
['._SUCCESS.crc', 'sector=-', 'sector=Consumer Discretionary', 'sector=Consumer Staples', 'sector=Energy', 'sector=Finance', 'sector=Healthcare', 'sector=Industrials', 'sector=Materials', 'sector=Media', 'sector=Real Estate', 'sector=Sectors', 'sector=Technology', 'sector=Utilities', '_SUCCESS']
%%time
df_pandas = df.toPandas()
[Stage 1:========================================================>(80 + 1) / 81]
CPU times: user 233 ms, sys: 270 ms, total: 504 ms Wall time: 1min 52s
%%time
df_grouped = df_pandas.groupby(['sector', 'industry'])['company_name'].count().reset_index()
CPU times: user 8.12 ms, sys: 5.58 ms, total: 13.7 ms Wall time: 13 ms
df_grouped.head()
| sector | industry | company_name | |
|---|---|---|---|
| 0 | - | - | 1263 |
| 1 | Consumer Discretionary | Apparel & Luxury | 4 |
| 2 | Consumer Discretionary | Containers & Packaging | 6 |
| 3 | Consumer Discretionary | Home and Homeware | 3 |
| 4 | Consumer Discretionary | Hotels, Lodging & Leisure | 9 |
df_grouped_sector = df_pandas.groupby(['sector'])['company_name'].count().reset_index()
df_grouped_sector = df_grouped_sector.rename(columns={'company_name': 'qnt_companies'})
df_grouped_sector.sort_values(by='qnt_companies', in)
| sector | qnt_companies | |
|---|---|---|
| 10 | Sectors | 2 |
| 2 | Consumer Staples | 47 |
| 8 | Media | 50 |
| 11 | Technology | 53 |
| 1 | Consumer Discretionary | 65 |
| 5 | Healthcare | 82 |
| 12 | Utilities | 90 |
| 9 | Real Estate | 93 |
| 6 | Industrials | 105 |
| 3 | Energy | 155 |
| 7 | Materials | 210 |
| 4 | Finance | 366 |
| 0 | - | 1263 |
%%time
fig = px.bar(df_grouped, x='sector', y='company_name', color='industry',
title='No. Companies per Sector and Industry',
labels={'company_name': '# Companies'},
height=500)
fig.show()
CPU times: user 466 ms, sys: 452 ms, total: 918 ms Wall time: 944 ms
plt.figure(figsize=(10, 6))
bars = plt.bar(df_grouped_sector['sector'], df_grouped_sector['qnt_companies'], color='blue')
plt.xlabel('Sector')
plt.ylabel('# Companies')
plt.title('# Companies per Sector')
plt.xticks(rotation=45, ha='right')
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width() / 2, height, str(int(height)),ha='center', va='bottom', fontsize=10, color='black')
plt.tight_layout()
plt.show()
spark.stop()
| Eder Valderrama | eder.valderrama@usp.br |